package com.kool.kmqtt.server.processer;

import com.kool.kmqtt.server.PacketSender;
import com.kool.kmqtt.server.constant.PacketTypeEnum;
import com.kool.kmqtt.server.encoder.UnsubscribePacketEncoder;
import com.kool.kmqtt.server.exception.ErrorCode;
import com.kool.kmqtt.server.exception.ProtocolException;
import com.kool.kmqtt.server.packet.FixedHeader;
import com.kool.kmqtt.server.packet.Packet;
import com.kool.kmqtt.server.packet.UnsubscribePayload;
import com.kool.kmqtt.server.packet.UnsubscribeVariableHeader;
import com.kool.kmqtt.server.parser.PacketParser;
import io.netty.channel.ChannelHandlerContext;

import java.util.List;

/**
 * UNSUBSCRIBE报文处理器
 */
public class UnsubscribePacketProcessor extends PacketProcessor {
    public UnsubscribePacketProcessor(ChannelHandlerContext ctx, PacketParser packetParser) {
        super(ctx, packetParser);
    }

    @Override
    protected void validate(Packet packet) {
        //UNSUBSCRIBE报文固定报头的第3,2,1,0位是保留位且必须分别设置为0,0,1,0
        FixedHeader fixedHeader = packet.getFixedHeader();
        if (fixedHeader.getFlags() != 2) {
            throw new ProtocolException(ErrorCode.UNSUBSCRIBE_FLAGS_ERROR);
        }

        //UNSUBSCRIBE报文的有效载荷必须至少包含一个主题过滤器
        UnsubscribePayload payload = (UnsubscribePayload) packet.getPayload();
        if (payload.getTopicFilters() == null || payload.getTopicFilters().size() == 0) {
            throw new ProtocolException(ErrorCode.UNSUBSCRIBE_INFO_NULL);
        }
    }

    @Override
    protected void processPacket(Packet packet) {
        int packetId = packet.getPacketId();
        String clientId = sessionContext.getClientId();
        UnsubscribePayload payload = (UnsubscribePayload) packet.getPayload();
        List<String> topicFilters = payload.getTopicFilters();
        //删除客户端的主题过滤器字符串完全相等的订阅信息
        for (String topicFilter : topicFilters) {
            repository.deleteSubscribeInfo(clientId, topicFilter);
        }

        FixedHeader fixedHeader = new FixedHeader();
        fixedHeader.setPacketType(PacketTypeEnum.UNSUBACK.getCode());
        fixedHeader.setFlags(0);
        fixedHeader.setRemainingLength(2);

        UnsubscribeVariableHeader unsubscribeVariableHeader = new UnsubscribeVariableHeader();
        unsubscribeVariableHeader.setPacketId(packetId);

        Packet unsubackPacket = new Packet();
        unsubackPacket.setFixedHeader(fixedHeader);
        unsubackPacket.setVariableHeader(unsubscribeVariableHeader);

        //响应UNSUBACK报文
        PacketSender packetSender = new PacketSender(sessionContext, new UnsubscribePacketEncoder());
        packetSender.send(unsubackPacket);
    }
}
